Skip to content

Conversation

@tustvold
Copy link
Contributor

Which issue does this PR close?

Closes #5885

Rationale for this change

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label May 18, 2023
MemoryConsumer::new(format!("ExternalSorterMerge[{partition_id}]"))
.register(&runtime.memory_pool);

merge_reservation.resize(EXTERNAL_SORTER_MERGE_RESERVATION);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it as a positive sign that this was required to make the spill tests pass, without this the merge would exceed the memory limit and fail

use tokio::task;

/// How much memory to reserve for performing in-memory sorts
const EXTERNAL_SORTER_MERGE_RESERVATION: usize = 10 * 1024 * 1024;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of this, but this somewhat patches around the issue that once we initiate a merge we can't then spill

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this approach is that even 10MB may not be enough to correctly merge the batches prior to spilling. So some queries that today would succeed (though exceed their memory limits) might fail.

It seems to me better approaches (as follow on PRs) would be:

  1. Make this a config parameter so users can avoid the error by reserving more memory up front if needed
  2. teach SortExec how to write more (smaller) spill files if it doesn't have enough memory to merge the in memory batches.

However, given the behavior on master today is to simply ignore the reservation and exceed the memory limit this behavior seems better than before.

I suggest we merge this PR as is and file a follow on ticket for the improved behavior

fn unregister(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
self.state.lock().num_spill -= 1;
self.state.lock().num_spill.checked_sub(1).unwrap();
Copy link
Contributor Author

@tustvold tustvold May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by sanity check (as first version of MemoryReservation::split would unregister the same consumer multiple times) and the debug checks are the only reason I noticed 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be worth adding some unit tests to the MemoryReservation now given it is growing in sophistication

@tustvold tustvold force-pushed the sort-memory-accounting branch from 435337f to c6542e0 Compare May 18, 2023 16:17
@tustvold tustvold force-pushed the sort-memory-accounting branch from c6542e0 to d180c8d Compare May 18, 2023 16:41
alamb
alamb previously approved these changes May 18, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I reviewed this code carefully and it makes sense to me.

However, when I ran the reproducer from https://github.com/influxdata/influxdb_iox/issues/7783 locally with this DataFusion patch IOx still exceeds memory significantly. I will update more there.

While of course, there are improvements that could be made I think it is a significant improvement.

self.merge_reservation.free();

self.in_mem_batches = self
.in_mem_sort_stream(self.metrics.baseline.intermediate())?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked that in_mem_sort_stream correctly respects self.reservation 👍

use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;

/// How much memory to reserve for performing in-memory sorts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// How much memory to reserve for performing in-memory sorts
/// How much memory to reserve for performing in-memory sorts prior to spill

/// Reservation for in_mem_batches
reservation: MemoryReservation,
partition_id: usize,
/// Reservation for in memory sorting of batches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Reservation for in memory sorting of batches
/// Reservation for in memory sorting of batches, prior to spilling.
/// Without this reservation, when the memory budget is exhausted
/// it might not be possible to merge the in memory batches as part
/// of spilling.

use tokio::task;

/// How much memory to reserve for performing in-memory sorts
const EXTERNAL_SORTER_MERGE_RESERVATION: usize = 10 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this approach is that even 10MB may not be enough to correctly merge the batches prior to spilling. So some queries that today would succeed (though exceed their memory limits) might fail.

It seems to me better approaches (as follow on PRs) would be:

  1. Make this a config parameter so users can avoid the error by reserving more memory up front if needed
  2. teach SortExec how to write more (smaller) spill files if it doesn't have enough memory to merge the in memory batches.

However, given the behavior on master today is to simply ignore the reservation and exceed the memory limit this behavior seems better than before.

I suggest we merge this PR as is and file a follow on ticket for the improved behavior


rows: Rows,

#[allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to note here in comments why the code needs to keep around a field that is never read (dead_code). I think it is to keep the reservation around long enough?

fn unregister(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
self.state.lock().num_spill -= 1;
self.state.lock().num_spill.checked_sub(1).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be worth adding some unit tests to the MemoryReservation now given it is growing in sophistication

@alamb
Copy link
Contributor

alamb commented May 23, 2023

We have found another cause of the memory use in IOx downstream, but I still think this PR is valuable. Once we sort out downstream we'll try and get this one polished up and ready to go

@alamb
Copy link
Contributor

alamb commented May 24, 2023

I plan to try and help this PR over the line in the next day or two

@alamb alamb dismissed their stale review June 8, 2023 19:00

tests don't pass yet

@alamb alamb marked this pull request as draft June 8, 2023 19:00
@alamb
Copy link
Contributor

alamb commented Jun 8, 2023

Converted to a draft as this PR is not ready to merge yet

@alamb
Copy link
Contributor

alamb commented Jul 27, 2023

Ok, i really do plan to pick this code up tomorrow and work on it

@alamb
Copy link
Contributor

alamb commented Jul 31, 2023

I have a new version of this code on #7130 that I am making progress

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

SortPreservingMerge does not account for memory usage

2 participants